AWS Lambdas nos permiten ejecutar funciones de manera aislada.
Para leer los parámetros que se le pasan a una lambda utilizamos el parámetro event.
Si hemos definido variables en el json de configuración simplemente podemos acceder a ellas asi:
event['nomber_variable']
Si estamos llamando a la Lambda utilizando una URL, podemos hacer con una solicitud post o get y por lo tanto pasarle parámetros en el body del post o query parameters en la URL.
Para obtener el body del post lo hacemos de la siguiente manera:
post_body = json.loads(event['body'])
Y para los query parameters lo hacemos asi:
query_parameters = event["queryStringParameters"]
Este es un ejemplo en el que se obtienen los dos tipos de parámetros:
import json
def lambda_handler(event, context):
post_body = json.loads(event['body'])
query_parameters = event["queryStringParameters"]
post_parameter = post_body['id']
my_query_param = query_parameters['name']
return {
'statusCode': 200,
'body': json.dumps('id is: ' + post_parameter + ' and name is: ' + my_query_param)
}
El siguiente ejemplo muestra como obtener los feeds de la API de Instagram, para ello obtiene el token para hacer la petición desde Parameter Store, hace la petición http con dicho token y despues los resultados los almacena en DynamoDB:
import boto3
import os
import time
from botocore.vendored import requests
url_instagram = 'https://graph.instagram.com/me/media'
url_refresh_token='https://graph.instagram.com/refresh_access_token'
access_token = None
ssm = None
table_feed = None
def lambda_handler(event, context):
#Init Simple Systems Manager for access to Parameter Store
init_ssm()
refresh_token()
init_dynamo()
return get_feeds()
def init_ssm():
parameter_name = '/Backend/SocialNetworks/AccessToken/Instagram'
global access_token
global ssm
ssm = boto3.client('ssm')
parameter = ssm.get_parameter(Name=parameter_name, WithDecryption=True)
access_token = parameter['Parameter']['Value']
def init_dynamo():
global table_feed
dynamodb = boto3.resource("dynamodb")
table_feed = dynamodb.Table("feedcontent3")
def refresh_token():
global access_token
global ssm
params_token = {
'grant_type': 'ig_refresh_token',
'access_token': access_token
}
response_token = requests.get(url_refresh_token, params=params_token)
if response_token.status_code == 200:
new_token = response_token.json()
else:
raise Exception('Error: cannot refresh token')
ssm.put_parameter(Name='/Backend/SocialNetworks/AccessToken/Instagram', Value=new_token['access_token'], Overwrite=True)
access_token = new_token['access_token']
def get_feeds():
global access_token
global url_instagram
global table_feed
params_feed = {
'fields': 'id,caption,timestamp,media_url',
'access_token': access_token
}
response = requests.get(url_instagram, params=params_feed)
if response.status_code == 200:
data = response.json()
else:
raise Exception('Error: cannot get feeds from Instagram')
for post in data["data"]:
caption = ""
if post.get("caption") != None:
caption = post["caption"];
try:
table_feed.put_item(Item={"PostId": post["id"], "caption": caption, "timestamp": post["timestamp"], "mediaurl": post["media_url"], "tiempo": int(time.time())}, ConditionExpression='attribute_not_exists(PostId)')
except:
continue
return {"message": "Success getting feeds from Instagram"}
Para poder ejecutar esta lambda tenemos que configurar los siguientes permisos:
AWS | Lambdas